package com.sczc.esm27.collection.netty;

import com.alibaba.fastjson.JSONObject;
import com.sczc.esm27.collection.core.message.IMessageHandle;
import com.sczc.esm27.collection.core.message.MessageHandleContext;
import com.sczc.esm27.collection.core.util.ContextUtils;
import com.sczc.esm27.collection.devices.response.SensorDataHandle;
import com.sczc.esm27.collection.entity.ESM27ResponseMessageEntity;
import com.sczc.esm27.collection.mq.TopicSender;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private final Logger log = LogManager.getLogger(getClass());
    private MessageHandleContext<ESM27ResponseMessageEntity, Object> handleContext = new MessageHandleContext<>();
    public static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public NettyServerHandler() {
        handleContext.addHandleClass(new SensorDataHandle());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
//            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
//            String clientIp = insocket.getAddress().getHostAddress();
            if (msg instanceof ESM27ResponseMessageEntity) {
                ESM27ResponseMessageEntity entity = (ESM27ResponseMessageEntity) msg;
                handleContext.handle(ctx, entity);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 抛弃收到的数据
            ReferenceCountUtil.release(msg);
        }
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        int clientPort = insocket.getPort();
        //获取连接通道唯一标识
        ChannelId channelId = ctx.channel().id();
        //如果map中不包含此连接，就保存连接
        if (channelGroup.contains(ctx.channel())) {
            log.info("客户端【" + channelId + "】是连接状态，连接通道数量: " + channelGroup.size());
        } else {
            //保存连接
            channelGroup.add(ctx.channel());
            log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
            log.info("连接通道数量: " + channelGroup.size());
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ChannelId channelId = ctx.channel().id();
        channelGroup.remove(ctx.channel());
        log.info("客户端【" + channelId + "】断开netty服务器[IP:" + clientIp + "]");
        log.info("连接通道数量: " + channelGroup.size());
    }


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                log.info("客户端【" + ctx.channel().id() + "】读超时，断开连接");
                ctx.close();
                log.info("客户端【" + ctx.channel().id() + "】断开netty服务器[IP:" + clientIp + "]");
                log.info("连接通道数量: " + channelGroup.size());
            } else if (e.state() == IdleState.WRITER_IDLE) {
                log.info("客户端【" + ctx.channel().id() + "】写超时，断开连接");
                ctx.close();
                log.info("客户端【" + ctx.channel().id() + "】断开netty服务器[IP:" + clientIp + "]");
                log.info("连接通道数量: " + channelGroup.size());
            } else if (e.state() == IdleState.ALL_IDLE) {
                log.info("客户端【" + ctx.channel().id() + "】读写超时，断开连接");
                ctx.close();
                log.info("客户端【" + ctx.channel().id() + "】断开netty服务器[IP:" + clientIp + "]");
                log.info("连接通道数量: " + channelGroup.size());
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + channelGroup.size());
    }

}
